package org.yunai.yfserver.server;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.yunai.yfserver.common.LoggerFactory;
import org.yunai.yfserver.message.IMessage;
import org.yunai.yfserver.message.ISessionMessage;
import org.yunai.yfserver.message.sys.EmptySysInternalMessage;
import org.yunai.yfserver.session.ISession;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 消息处理器
 * User: yunai
 * Date: 13-3-25
 * Time: 上午9:17
 */
public class QueueMessageProcessor implements IMessageProcessor {

    private static final Logger LOGGER_MSG = LoggerFactory.getLogger(LoggerFactory.Logger.msg, QueueMessageProcessor.class);
    private static final Logger LOGGER_SERVER = LoggerFactory.getLogger(LoggerFactory.Logger.msg, QueueMessageProcessor.class);


    /**
     * 消息队列长度<br />
     * <pre>
     *     1. 当前消息队列不设置最大长度
     *     2. 当超过这个长度时，就认为队列已满
     * </pre>
     */
    public static final int CAPACITY = 100;
    /**
     * 消息队列
     */
    private final BlockingQueue<IMessage> queue;
    /**
     * 消息处理线程
     */
    private volatile Thread thread;
    /**
     * 是否已经停止
     */
    private volatile boolean stopped = true;
    /**
     * 处理的消息总数
     */
    private long messageCount = 0;
    /**
     * 等待消息处理线程退出Latch
     */
    private volatile CountDownLatch stopLatch;
    /**
     * 消息轮询Runner
     */
    private Runnable pollingRunner = new Runnable() {
        @Override
        public void run() {
            stopLatch = new CountDownLatch(1);
            try {
                while (!stopped) {
                    process(queue.take());
                }
            } catch (InterruptedException e) {
                LOGGER_MSG.error("[pollingRunner] [取得消息时, 线程被中断] error[{}].", ExceptionUtils.getStackTrace(e));
            } catch (Exception e) {
                LOGGER_MSG.error("[pollingRunner] [执行逻辑时，发生错误] error[{}].", ExceptionUtils.getStackTrace(e));
            } finally {
                stopLatch.countDown();
            }
        }
    };
    /**
     * 是否处理剩余消息<br />
     * TODO 目前用处不是很大
     */
    private final boolean processLeft;
    /**
     * 剩余消息
     */
    private volatile List<IMessage> leftQueue;

    /**
     * 创建不处理剩余消息的消息处理器
     */
    public QueueMessageProcessor() {
        this(false);
    }

    /**
     * @param processLeft 是否要处理剩余消息
     */
    public QueueMessageProcessor(boolean processLeft) {
        queue = new LinkedBlockingQueue();
        this.processLeft = processLeft;
    }

    @Override
    public synchronized void start() {
        stopped = false;

        if (thread != null) {
            throw new IllegalStateException("该消息处理器已经有Thread了!");
        }
        thread = new Thread(pollingRunner, "MessageProcessor Thread");
        thread.start();

        LOGGER_MSG.info("MessageProcessor thread started: {}", thread);
    }

    @Override
    public synchronized void stop() {
        LOGGER_SERVER.info("[stop] [Message processor stop begin].");
        stopped = true;
        try {
            // 停止消息处理线程
            LOGGER_SERVER.info("[stop] [Message processor thread:{} stopping].", thread.getName());
//            if (thread != null) { // TODO 这里没准可以考虑不用interrupt，而靠丢个空消息进去，有空尝试下
//                thread.interrupt();
//            }
            queue.put(new EmptySysInternalMessage()); // 该处去掉上面的thread.interrupt();
            if (stopLatch != null) {
                stopLatch.await();
            }
            LOGGER_SERVER.info("[stop] [Message processor thread:{} stop success].", thread.getName());
            // 将未处理的消息放入到leftQueue中,以备后续处理
            if (processLeft) {
                leftQueue = new LinkedList<IMessage>();
                while (true) {
                    IMessage msg = queue.take();
                    if (msg == null) {
                        LOGGER_SERVER.info("[stop] [find a null msg]");
                        break;
                    }
                    leftQueue.add(msg);
                }
            }
            queue.clear();
            LOGGER_SERVER.info("[stop] [Message processor stop success].");
        } catch (InterruptedException e) {
            LOGGER_SERVER.error("[stop] [Message processor stop fail] [error: {}]", ExceptionUtils.getStackTrace(e));
        }
    }

    @Override
    public void put(IMessage message) {
        if (stopped) {
            LOGGER_MSG.debug("[put] [消息处理器已经停止, 无法添加消息].");
            return;
        }
        try {
            queue.put(message);
            LOGGER_MSG.debug("[put] [当前消息队列长度:{}].", queue.size());
        } catch (InterruptedException e) {
            LOGGER_MSG.error("[put] [添加消息时,线程被中断] error[{}].", ExceptionUtils.getStackTrace(e));
        }
    }

    @Override
    public boolean isFull() {
        return queue.size() >= CAPACITY;
    }

    /**
     * 子类实现对消息的处理
     * @param message 消息
     */
    protected void process(IMessage message) {
        if (message == null) {
            LOGGER_MSG.error("[process] [message is null].");
            return;
        }
        long beginNanoTime = LOGGER_MSG.isInfoEnabled() ? System.nanoTime(): 0;
        messageCount++;
        try {
            message.execute();
        } catch (Exception e) {
            LOGGER_MSG.error("[process] [error:{}].", ExceptionUtils.getStackTrace(e));
            try {
                // TODO 以后这里最好改成增加个handleException的方法，让子类在去实现.
                // 目前先默认实现一个落
                if (message instanceof ISessionMessage<?>) {
                    ISessionMessage sessionMessage = (ISessionMessage<?>) message;
                    ISession session = sessionMessage.getSession();
                    if (session != null && session.isConnected()) {
                        LOGGER_MSG.info("[process] [msg:{} error] [session:{} close]", message.getCode(), session);
                        session.close();
                    }
                }
            } catch (Exception ex) {
                LOGGER_MSG.error("[process] [msg:{} error handle failure] [error: {}]", message.getCode(), ExceptionUtils.getCause(ex));
            }
        } finally {
            if (LOGGER_MSG.isInfoEnabled()) { // 节省下System.nanoTime()的开销
                LOGGER_MSG.info("[process] [Message:{}, Cost:{} ms, Total: {}].", message.getCode(), (System.nanoTime() - beginNanoTime) / 1000000, messageCount);
            }
        }
    }

    /**
     * @return 消息处理线程编号. 可能返回空
     */
    public Long getThreadId() {
        return thread != null ? thread.getId() : null;
    }
}
